<?php
/**
 * Created by zhanghuiliang@xiangwushuo.com
 * User: vincentzhang
 * Date: 2018/5/30
 * Time: 09:20
 */

$broker = '172.17.122.4:9092';
$topic = 'zhanghuiliang';
//$broker = '172.17.222.17:9092';//日志
//$topic = 'shareappclicklog';
$part = 0;
$conf = new \Rdkafka\Conf();
$conf->set('group.id', 'zhanghl');
$conf->set('metadata.broker.list', $broker);
$topicConf = new \Rdkafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf->setDefaultTopicConf($topicConf);
$consumer = new \Rdkafka\KafkaConsumer($conf);
$consumer->subscribe([$topic]);
echo 'wating...';
while (true) {
    $mes = $consumer->consume(120 * 1000);
    switch ($mes->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo 'payload:';
            //$obj->$callback($mes->payload);
            $a = json_decode($mes->payload, true);
            print_r($a);

            break;
        default:
            break;
    }
//    sleep(1);
}

//$rk = new RdKafka\Consumer();
//$rk->setLogLevel(LOG_DEBUG);
//// 指定 broker 地址,多个地址用"," 分割
//$rk->addBrokers("172.17.0.16:9092");
//
//
//$topic = $rk->newTopic("test");
//$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//
//
//while (true) {
//    // 第一个参数是分区号
//    // 第二个参数是超时时间
//    $msg = $topic->consume(0, 1000);
//    if ($msg->err) {
//        echo $msg->errstr(), "\n";
//        break;
//    } else {
//        echo $msg->payload, "\n";
//    }
//}